package com.uyuni.rpc.client.provider;

import com.uyuni.rpc.common.exception.remoting.RemotingException;
import com.uyuni.rpc.common.transport.body.AckCustomBody;
import com.uyuni.rpc.common.utils.SystemClock;
import com.uyuni.rpc.transport.model.RemotingTransporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.uyuni.rpc.common.serialization.SerializerHolder.serializerImpl;

/**
 * @author BazingaLyn
 * @description provider端专业去连接registry的管理控制对象，用来处理provider与registry一切交互事宜
 * @time 2016年8月16日
 * @modifytime
 */
public class RegistryController {

    private static final Logger logger = LoggerFactory.getLogger(RegistryController.class);

    private DefaultProvider defaultProvider;

    private final ConcurrentMap<Long, MessageNonAck> messagesNonAcks = new ConcurrentHashMap<>();

    public RegistryController(DefaultProvider defaultProvider) {
        this.defaultProvider = defaultProvider;
    }

    /**
     * @throws InterruptedException
     * @throws RemotingException
     */
    public void publishedAndStartProvider() throws InterruptedException, RemotingException {

        // 获取需要发布的服务
        List<RemotingTransporter> transporters = defaultProvider.getPublishRemotingTransporters();

        if (null == transporters || transporters.isEmpty()) {
            logger.warn("service is empty please call DefaultProvider #publishService method");
            return;
        }
        // 获取配置注册中心地址
        String address = defaultProvider.getRegistryAddress();

        if (address == null) {
            logger.warn("registry center address is empty please check your address");
            return;
        }
        // 循环向注册中心注册服务
        String[] addresses = address.split(",");
        if (addresses.length > 0) {
            for (String eachAddress : addresses) {
                for (RemotingTransporter request : transporters) {
                    pushPublishServiceToRegistry(request, eachAddress);
                }
            }
        }
    }

    /**
     * 发布服务至注册中心
     *
     * @param request     服务
     * @param eachAddress 注册中心地址
     * @throws InterruptedException
     * @throws RemotingException
     */
    private void pushPublishServiceToRegistry(RemotingTransporter request, String eachAddress) throws InterruptedException, RemotingException {
        logger.info("[{}] transporters matched", request);
        //保存向注册中心发布未ack的服务
        messagesNonAcks.put(request.getOpaque(), new MessageNonAck(request, eachAddress));
        // 获取注册中心response 响应
        RemotingTransporter remotingTransporter = defaultProvider.getNettyRemotingClient().invokeSync(eachAddress, request, 3000);
        if (null != remotingTransporter) {
            //反序列化 注册中心的ack信息
            AckCustomBody ackCustomBody = serializerImpl().readObject(remotingTransporter.bytes(), AckCustomBody.class);

            logger.info("received ack info [{}]", ackCustomBody);
            //ack 成功 则从messageNonAcks中移除
            if (ackCustomBody.isSuccess()) {
                messagesNonAcks.remove(ackCustomBody.getRequestId());
            }
        } else {
            logger.warn("registry center handler timeout");
        }
    }

    public void checkPublishFailMessage() throws InterruptedException, RemotingException {
        if (messagesNonAcks.keySet().size() > 0) {
            logger.warn("have [{}] message send failed,send again", messagesNonAcks.keySet().size());
            for (MessageNonAck ack : messagesNonAcks.values()) {
                pushPublishServiceToRegistry(ack.getMsg(), ack.getAddress());
            }
        }
    }


    static class MessageNonAck {

        private final long id;

        private final RemotingTransporter msg;
        private final String address;
        private final long timestamp = SystemClock.millisClock().now();

        public MessageNonAck(RemotingTransporter msg, String address) {
            this.msg = msg;
            this.address = address;

            id = msg.getOpaque();
        }

        public long getId() {
            return id;
        }

        public RemotingTransporter getMsg() {
            return msg;
        }

        public String getAddress() {
            return address;
        }

        public long getTimestamp() {
            return timestamp;
        }

    }

}
